package kafka_go

import (
	"fmt"
	sarama "gitee.com/tym_hmm/go-kafa-shopify-sarama"
	"strings"
)

//消费策略
type BalanceType int
type AckLevel int

const (
	//消费策略-轮询 默认 消费者连接池失败， 暂时只使用轮询
	CONSUMER_BALANCE_STRATEGY_ROUNDROBIN BalanceType = 0
	//消费策略-粘性
	CONSUMER_BALANCE_STRATEGY_STICKY BalanceType = 1
	//消费策略-随机
	CONSUMER_BALANCE_STRATEGY_RANGE BalanceType = 2
)

//初始化消费者构建器
func NewBuildConsumer(addr string, groupId string, topic string) BuildConsumerApi {
	return NewBuildConsumerStrategy(addr, groupId, topic, CONSUMER_BALANCE_STRATEGY_ROUNDROBIN)
}

/**
创建消费构建者[默认拉取历史消费]
*/
func NewBuildConsumerStrategy(addr string, groupId string, topic string, balanceType BalanceType) BuildConsumerApi {
	return NewNewBuildConsumerOffset(addr, groupId, topic, balanceType, true)
}

/**
创建消费构建者[判断是否拉取历史消费]
*/
func NewNewBuildConsumerOffset(addr string, groupId string, topic string, balanceType BalanceType, oldOffset bool) BuildConsumerApi {
	return &BuildConsumer{
		isDebug:             false,
		addr:                addr,
		groupId:             groupId,
		topic:               topic,
		balanceType:         balanceType,
		isMultiplePartition: false,
		isAutoCommit:        true,
		oldOffset:           oldOffset,
		kafkaVersion:        "unknown",
		config:              sarama.NewConfig(),
	}
}

//消费者构建
type BuildConsumer struct {
	isDebug bool
	//服务地址 集群时使用,分隔
	addr string
	//组id
	groupId string
	//主题
	topic string
	//消费策略-默认为轮询
	balanceType BalanceType
	//是否多分区
	isMultiplePartition bool
	//启用时是否拉取历史消息 默认为false
	oldOffset bool

	//是否自动提交 默认为自动提交:true 如果使用自动提交需手动提交offet
	isAutoCommit bool

	//是否事务提交
	isTransCommit bool

	//消息返回事件
	responseListener ConsumerResponseListener

	//kafka版本
	kafkaVersion string

	//其它配置
	config *sarama.Config
}

func (this *BuildConsumer) SetDebug(isDebug bool) BuildConsumerApi {
	this.isDebug = isDebug
	return this
}
func (this *BuildConsumer) IsDebug() bool {
	return this.isDebug
}
func (this *BuildConsumer) GetAddr() string {
	return this.addr
}

func (this *BuildConsumer) GetGroupId() string {
	return strings.TrimSpace(this.groupId)
}

func (this *BuildConsumer) GetTopic() string {
	return strings.TrimSpace(this.topic)
}

func (this *BuildConsumer) GetBalanceType() BalanceType {
	return this.balanceType
}

func (this *BuildConsumer) IsMultiplePartition() bool {
	return this.isMultiplePartition
}

func (this *BuildConsumer) IsOldOffset() bool {
	return this.oldOffset
}

func (this *BuildConsumer) IsAutoCommit() bool {
	return this.isAutoCommit
}

func (this *BuildConsumer) GetKafkaVersion() string {
	return this.kafkaVersion
}

func (this *BuildConsumer) GetConfig() *sarama.Config {
	return this._getConfig()
}

//设置kafka版本号
func (this *BuildConsumer) SetKafkaVersion(kafkaVersion string) BuildConsumerApi {
	this.kafkaVersion = kafkaVersion
	return this
}

//设置是否加载多个分区
func (this *BuildConsumer) SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi {
	this.isMultiplePartition = isMultiplePartition
	return this
}

//设置其它配置
func (this *BuildConsumer) SetConfig(config *sarama.Config) BuildConsumerApi {
	this.config = config
	return this
}

//分区策略类型
func (this *BuildConsumer) SetBalanceType(balanceType BalanceType) BuildConsumerApi {
	this.balanceType = balanceType
	return this
}

//设置消息返回事件
func (this *BuildConsumer) SetResponseListener(responseListener ConsumerResponseListener) BuildConsumerApi {
	this.responseListener = responseListener
	return this
}

//消息返回事件监听
func (this *BuildConsumer) GetResponseListener() ConsumerResponseListener {
	return this.responseListener
}

/**
是否自动提交
*/
func (this *BuildConsumer) SetIsAutoCommit(autoCommit bool) BuildConsumerApi {
	this.isAutoCommit = autoCommit
	return this
}

func (this *BuildConsumer) ToString() string {
	return fmt.Sprintf("addr:%s, groupId:%s, topic:%s, balanceType:%v, oldOffset:%t, kafkaVersion:%s, isAutoCommit:%t,isMultiplePartition:%t, config:%v", this.addr, this.groupId, this.topic, this.balanceType, this.oldOffset, this.kafkaVersion, this.isAutoCommit, this.isMultiplePartition, *this.config)
}

func (this *BuildConsumer) _getConfig() *sarama.Config {
	this.config.Consumer.Offsets.AutoCommit.Enable = this.isAutoCommit
	this.config.Consumer.Offsets.Initial = sarama.OffsetNewest
	switch this.GetBalanceType() {
	case CONSUMER_BALANCE_STRATEGY_ROUNDROBIN:
		this.config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
	case CONSUMER_BALANCE_STRATEGY_STICKY:
		this.config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
	case CONSUMER_BALANCE_STRATEGY_RANGE:
		this.config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
	default:
		this.config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
	}
	return this.config
}
